// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package datacoord

import (
	"context"
	"testing"

	"github.com/cockroachdb/errors"
	"github.com/stretchr/testify/assert"
	"github.com/stretchr/testify/mock"
	"github.com/tidwall/gjson"
	"google.golang.org/grpc"

	"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
	"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
	"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
	"github.com/milvus-io/milvus/internal/datacoord/session"
	"github.com/milvus-io/milvus/internal/json"
	"github.com/milvus-io/milvus/internal/metastore/model"
	"github.com/milvus-io/milvus/internal/mocks"
	"github.com/milvus-io/milvus/internal/types"
	"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
	"github.com/milvus-io/milvus/pkg/v2/util/merr"
	"github.com/milvus-io/milvus/pkg/v2/util/metricsinfo"
	"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
	"github.com/milvus-io/milvus/pkg/v2/util/tsoutil"
	"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)

func TestGetDataNodeMetrics(t *testing.T) {
	ctx := context.Background()

	mockNodeManager := session.NewMockNodeManager(t)
	creator := func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) {
		dn := mocks.NewMockDataNodeClient(t)
		dn.EXPECT().GetMetrics(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, req *milvuspb.GetMetricsRequest, option ...grpc.CallOption) (*milvuspb.GetMetricsResponse, error) {
			const nodeID = 100
			nodeInfos := metricsinfo.DataNodeInfos{
				BaseComponentInfos: metricsinfo.BaseComponentInfos{
					Name: metricsinfo.ConstructComponentName(typeutil.DataNodeRole, nodeID),
					ID:   nodeID,
				},
			}
			resp, err := metricsinfo.MarshalComponentInfos(nodeInfos)
			if err != nil {
				return &milvuspb.GetMetricsResponse{
					Status: &commonpb.Status{
						ErrorCode: commonpb.ErrorCode_UnexpectedError,
						Reason:    err.Error(),
					},
					Response:      "",
					ComponentName: metricsinfo.ConstructComponentName(typeutil.DataNodeRole, nodeID),
				}, nil
			}
			return &milvuspb.GetMetricsResponse{
				Status:        merr.Success(),
				Response:      resp,
				ComponentName: metricsinfo.ConstructComponentName(typeutil.DataNodeRole, nodeID),
			}, nil
		})
		return dn, nil
	}

	mockNodeManager.EXPECT().Startup(mock.Anything, mock.Anything).Return(nil)
	mockNodeManager.EXPECT().GetClientIDs().Return([]int64{1})
	mockNodeManager.EXPECT().GetClient(mock.Anything).RunAndReturn(func(nodeID int64) (types.DataNodeClient, error) {
		switch nodeID {
		case 100:
			return creator(ctx, "127.0.0.1:10086", nodeID)
		case 101:
			dn := mocks.NewMockDataNodeClient(t)
			dn.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(nil, errors.New("mocked fail"))
			return dn, nil
		case 102:
			dn := mocks.NewMockDataNodeClient(t)
			dn.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{
				Status: merr.Status(errors.New("mocked error")),
			}, nil)
			return dn, nil
		case 103:
			dn := mocks.NewMockDataNodeClient(t)
			dn.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{
				Status:   merr.Success(),
				Response: `{"error_reason": 1}`,
			}, nil)
			return dn, nil

		default:
			return nil, errors.New("node not found")
		}
	})

	svr := newTestServer(t, func(svr *Server) {
		svr.nodeManager = mockNodeManager
	})
	defer closeTestServer(t, svr)
	// nil node
	req := &milvuspb.GetMetricsRequest{}
	_, err := svr.getDataNodeMetrics(ctx, req, 0)
	assert.Error(t, err)

	// mock datanode client
	info, err := svr.getDataNodeMetrics(ctx, req, 100)
	assert.NoError(t, err)
	assert.False(t, info.HasError)
	assert.Equal(t, metricsinfo.ConstructComponentName(typeutil.DataNodeRole, 100), info.BaseComponentInfos.Name)

	info, err = svr.getDataNodeMetrics(ctx, req, 101)
	assert.NoError(t, err)
	assert.True(t, info.HasError)

	info, err = svr.getDataNodeMetrics(ctx, req, 102)
	assert.NoError(t, err)
	assert.True(t, info.HasError)
	assert.Equal(t, "mocked error", info.ErrorReason)

	info, err = svr.getDataNodeMetrics(ctx, req, 103)
	assert.NoError(t, err)
	assert.True(t, info.HasError)
}

func TestGetIndexNodeMetrics(t *testing.T) {
	svr := newTestServer(t)
	defer closeTestServer(t, svr)

	ctx := context.Background()
	req := &milvuspb.GetMetricsRequest{}
	// nil node
	_, err := svr.getIndexNodeMetrics(ctx, req, nil)
	assert.Error(t, err)

	// return error
	dn := mocks.NewMockDataNodeClient(t)
	dn.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(nil, errors.New("mock error"))
	info, err := svr.getIndexNodeMetrics(ctx, req, dn)
	assert.NoError(t, err)
	assert.True(t, info.HasError)

	// failed
	mockErr := errors.New("mocked error")
	dn = mocks.NewMockDataNodeClient(t)
	dn.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{
		Status:        merr.Status(mockErr),
		ComponentName: "indexnode100",
	}, nil)
	info, err = svr.getIndexNodeMetrics(ctx, req, dn)
	assert.NoError(t, err)
	assert.True(t, info.HasError)
	assert.Equal(t, metricsinfo.ConstructComponentName(typeutil.IndexNodeRole, 100), info.BaseComponentInfos.Name)

	// return unexpected
	dn = mocks.NewMockDataNodeClient(t)
	dn.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{
		Status:        merr.Success(),
		Response:      "XXXXXXXXXXXXX",
		ComponentName: "indexnode100",
	}, nil)
	info, err = svr.getIndexNodeMetrics(ctx, req, dn)
	assert.NoError(t, err)
	assert.True(t, info.HasError)
	assert.Equal(t, metricsinfo.ConstructComponentName(typeutil.IndexNodeRole, 100), info.BaseComponentInfos.Name)

	// success
	dn = mocks.NewMockDataNodeClient(t)
	dn.EXPECT().GetMetrics(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, req *milvuspb.GetMetricsRequest, option ...grpc.CallOption) (*milvuspb.GetMetricsResponse, error) {
		nodeID = UniqueID(100)

		nodeInfos := metricsinfo.DataNodeInfos{
			BaseComponentInfos: metricsinfo.BaseComponentInfos{
				Name: metricsinfo.ConstructComponentName(typeutil.IndexNodeRole, nodeID),
				ID:   nodeID,
			},
		}
		resp, err := metricsinfo.MarshalComponentInfos(nodeInfos)
		if err != nil {
			return &milvuspb.GetMetricsResponse{
				Status:        merr.Status(err),
				ComponentName: metricsinfo.ConstructComponentName(typeutil.IndexNodeRole, nodeID),
			}, nil
		}

		return &milvuspb.GetMetricsResponse{
			Status:        merr.Success(),
			Response:      resp,
			ComponentName: metricsinfo.ConstructComponentName(typeutil.IndexNodeRole, nodeID),
		}, nil
	})
	info, err = svr.getIndexNodeMetrics(ctx, req, dn)

	assert.NoError(t, err)
	assert.False(t, info.HasError)
	assert.Equal(t, metricsinfo.ConstructComponentName(typeutil.IndexNodeRole, 100), info.BaseComponentInfos.Name)
}

func TestGetSyncTaskMetrics(t *testing.T) {
	svr := Server{}
	t.Run("ReturnsCorrectJSON", func(t *testing.T) {
		req := &milvuspb.GetMetricsRequest{}
		ctx := context.Background()

		tasks := []metricsinfo.SyncTask{
			{
				SegmentID:     1,
				BatchRows:     100,
				SegmentLevel:  "L0",
				TSFrom:        "t1",
				TSTo:          "t2",
				DeltaRowCount: 10,
				FlushSize:     1024,
				RunningTime:   "2h",
			},
		}
		tasksBytes, err := json.Marshal(tasks)
		assert.NoError(t, err)
		expectedJSON := string(tasksBytes)

		mockResp := &milvuspb.GetMetricsResponse{
			Status:   merr.Success(),
			Response: expectedJSON,
		}

		dn := mocks.NewMockDataNodeClient(t)
		dn.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(mockResp, nil)

		nodeManager := session.NewMockNodeManager(t)
		nodeManager.EXPECT().GetClient(mock.Anything).Return(dn, nil)
		nodeManager.EXPECT().GetClientIDs().Return([]int64{1})
		svr.nodeManager = nodeManager

		actualJSON, err := svr.getSyncTaskJSON(ctx, req)
		assert.NoError(t, err)
		assert.Equal(t, expectedJSON, actualJSON)
	})

	t.Run("ReturnsErrorOnRequestFailure", func(t *testing.T) {
		req := &milvuspb.GetMetricsRequest{}
		ctx := context.Background()

		dn := mocks.NewMockDataNodeClient(t)
		dn.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(nil, errors.New("request failed"))

		nodeManager := session.NewMockNodeManager(t)
		nodeManager.EXPECT().GetClient(mock.Anything).Return(dn, nil)
		nodeManager.EXPECT().GetClientIDs().Return([]int64{1})
		svr.nodeManager = nodeManager

		actualJSON, err := svr.getSyncTaskJSON(ctx, req)
		assert.Error(t, err)
		assert.Equal(t, "", actualJSON)
	})

	t.Run("ReturnsErrorOnUnmarshalFailure", func(t *testing.T) {
		req := &milvuspb.GetMetricsRequest{}
		ctx := context.Background()

		mockResp := &milvuspb.GetMetricsResponse{
			Status:   merr.Success(),
			Response: `invalid json`,
		}

		dn := mocks.NewMockDataNodeClient(t)
		dn.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(mockResp, nil)

		nodeManager := session.NewMockNodeManager(t)
		nodeManager.EXPECT().GetClient(mock.Anything).Return(dn, nil)
		nodeManager.EXPECT().GetClientIDs().Return([]int64{1})
		svr.nodeManager = nodeManager

		actualJSON, err := svr.getSyncTaskJSON(ctx, req)
		assert.Error(t, err)
		assert.Equal(t, "", actualJSON)
	})

	t.Run("ReturnsEmptyJSONWhenNoTasks", func(t *testing.T) {
		req := &milvuspb.GetMetricsRequest{}
		ctx := context.Background()

		mockResp := &milvuspb.GetMetricsResponse{
			Status:   merr.Success(),
			Response: "",
		}

		dn := mocks.NewMockDataNodeClient(t)
		dn.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(mockResp, nil)

		nodeManager := session.NewMockNodeManager(t)
		nodeManager.EXPECT().GetClient(mock.Anything).Return(dn, nil)
		nodeManager.EXPECT().GetClientIDs().Return([]int64{1})
		svr.nodeManager = nodeManager

		expectedJSON := "null"
		actualJSON, err := svr.getSyncTaskJSON(ctx, req)
		assert.NoError(t, err)
		assert.Equal(t, expectedJSON, actualJSON)
	})
}

func TestGetSegmentsJSON(t *testing.T) {
	svr := Server{}
	t.Run("ReturnsCorrectJSON", func(t *testing.T) {
		req := &milvuspb.GetMetricsRequest{}
		ctx := context.Background()

		segments := []*metricsinfo.Segment{
			{
				SegmentID:    1,
				CollectionID: 100,
				PartitionID:  10,
				NumOfRows:    1000,
				State:        "Flushed",
			},
		}
		segmentsBytes, err := json.Marshal(segments)
		assert.NoError(t, err)
		expectedJSON := string(segmentsBytes)

		mockResp := &milvuspb.GetMetricsResponse{
			Status:   merr.Success(),
			Response: expectedJSON,
		}

		dn := mocks.NewMockDataNodeClient(t)
		dn.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(mockResp, nil)

		nodeManager := session.NewMockNodeManager(t)
		nodeManager.EXPECT().GetClient(mock.Anything).Return(dn, nil)
		nodeManager.EXPECT().GetClientIDs().Return([]int64{1})
		svr.nodeManager = nodeManager

		actualJSON, err := svr.getDataNodeSegmentsJSON(ctx, req)
		assert.NoError(t, err)
		assert.Equal(t, expectedJSON, actualJSON)
	})

	t.Run("ReturnsErrorOnRequestFailure", func(t *testing.T) {
		req := &milvuspb.GetMetricsRequest{}
		ctx := context.Background()

		dn := mocks.NewMockDataNodeClient(t)
		dn.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(nil, errors.New("request failed"))

		nodeManager := session.NewMockNodeManager(t)
		nodeManager.EXPECT().GetClient(mock.Anything).Return(dn, nil)
		nodeManager.EXPECT().GetClientIDs().Return([]int64{1})
		svr.nodeManager = nodeManager

		actualJSON, err := svr.getDataNodeSegmentsJSON(ctx, req)
		assert.Error(t, err)
		assert.Equal(t, "", actualJSON)
	})

	t.Run("ReturnsErrorOnUnmarshalFailure", func(t *testing.T) {
		req := &milvuspb.GetMetricsRequest{}
		ctx := context.Background()

		mockResp := &milvuspb.GetMetricsResponse{
			Status:   merr.Success(),
			Response: `invalid json`,
		}

		dn := mocks.NewMockDataNodeClient(t)
		dn.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(mockResp, nil)

		nodeManager := session.NewMockNodeManager(t)
		nodeManager.EXPECT().GetClient(mock.Anything).Return(dn, nil)
		nodeManager.EXPECT().GetClientIDs().Return([]int64{1})
		svr.nodeManager = nodeManager

		actualJSON, err := svr.getDataNodeSegmentsJSON(ctx, req)
		assert.Error(t, err)
		assert.Equal(t, "", actualJSON)
	})

	t.Run("ReturnsEmptyJSONWhenNoSegments", func(t *testing.T) {
		req := &milvuspb.GetMetricsRequest{}
		ctx := context.Background()

		mockResp := &milvuspb.GetMetricsResponse{
			Status:   merr.Success(),
			Response: "",
		}

		dn := mocks.NewMockDataNodeClient(t)
		dn.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(mockResp, nil)

		nodeManager := session.NewMockNodeManager(t)
		nodeManager.EXPECT().GetClient(mock.Anything).Return(dn, nil)
		nodeManager.EXPECT().GetClientIDs().Return([]int64{1})
		svr.nodeManager = nodeManager

		expectedJSON := "null"
		actualJSON, err := svr.getDataNodeSegmentsJSON(ctx, req)
		assert.NoError(t, err)
		assert.Equal(t, expectedJSON, actualJSON)
	})
}

func TestGetChannelsJSON(t *testing.T) {
	svr := Server{}
	t.Run("ReturnsCorrectJSON", func(t *testing.T) {
		req := &milvuspb.GetMetricsRequest{}

		channels := []*metricsinfo.Channel{
			{
				Name:         "channel1",
				CollectionID: 100,
				NodeID:       1,
			},
		}
		channelsBytes, err := json.Marshal(channels)
		assert.NoError(t, err)
		channelJSON := string(channelsBytes)

		mockResp := &milvuspb.GetMetricsResponse{
			Status:   merr.Success(),
			Response: channelJSON,
		}

		dn := mocks.NewMockDataNodeClient(t)
		dn.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(mockResp, nil)

		nodeManager := session.NewMockNodeManager(t)
		nodeManager.EXPECT().GetClient(mock.Anything).Return(dn, nil)
		nodeManager.EXPECT().GetClientIDs().Return([]int64{1})
		svr.nodeManager = nodeManager

		svr.meta = &meta{channelCPs: newChannelCps()}
		svr.meta.channelCPs.checkpoints["channel1"] = &msgpb.MsgPosition{Timestamp: 1000}

		actualJSON, err := svr.getChannelsJSON(context.TODO(), req)
		assert.NoError(t, err)

		channels = []*metricsinfo.Channel{
			{
				Name:         "channel1",
				CollectionID: 100,
				NodeID:       1,
				CheckpointTS: tsoutil.PhysicalTimeFormat(1000),
			},
		}
		channelsBytes, err = json.Marshal(channels)
		assert.NoError(t, err)
		expectedJSON := string(channelsBytes)

		assert.Equal(t, expectedJSON, actualJSON)
	})

	t.Run("ReturnsErrorOnRequestFailure", func(t *testing.T) {
		req := &milvuspb.GetMetricsRequest{}
		ctx := context.Background()

		dn := mocks.NewMockDataNodeClient(t)
		dn.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(nil, errors.New("request failed"))

		nodeManager := session.NewMockNodeManager(t)
		nodeManager.EXPECT().GetClient(mock.Anything).Return(dn, nil)
		nodeManager.EXPECT().GetClientIDs().Return([]int64{1})
		svr.nodeManager = nodeManager

		svr.meta = &meta{channelCPs: newChannelCps()}

		actualJSON, err := svr.getChannelsJSON(ctx, req)
		assert.Error(t, err)
		assert.Equal(t, "", actualJSON)
	})

	t.Run("ReturnsErrorOnUnmarshalFailure", func(t *testing.T) {
		req := &milvuspb.GetMetricsRequest{}
		ctx := context.Background()

		mockResp := &milvuspb.GetMetricsResponse{
			Status:   merr.Success(),
			Response: `invalid json`,
		}

		dn := mocks.NewMockDataNodeClient(t)
		dn.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(mockResp, nil)

		nodeManager := session.NewMockNodeManager(t)
		nodeManager.EXPECT().GetClient(mock.Anything).Return(dn, nil)
		nodeManager.EXPECT().GetClientIDs().Return([]int64{1})
		svr.nodeManager = nodeManager

		svr.meta = &meta{channelCPs: newChannelCps()}

		actualJSON, err := svr.getChannelsJSON(ctx, req)
		assert.Error(t, err)
		assert.Equal(t, "", actualJSON)
	})

	t.Run("ReturnsEmptyJSONWhenNoChannels", func(t *testing.T) {
		req := &milvuspb.GetMetricsRequest{}
		ctx := context.Background()

		mockResp := &milvuspb.GetMetricsResponse{
			Status:   merr.Success(),
			Response: "",
		}

		dn := mocks.NewMockDataNodeClient(t)
		dn.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(mockResp, nil)

		nodeManager := session.NewMockNodeManager(t)
		nodeManager.EXPECT().GetClient(mock.Anything).Return(dn, nil)
		nodeManager.EXPECT().GetClientIDs().Return([]int64{1})
		svr.nodeManager = nodeManager

		svr.meta = &meta{channelCPs: newChannelCps()}

		expectedJSON := "null"
		actualJSON, err := svr.getChannelsJSON(ctx, req)
		assert.NoError(t, err)
		assert.Equal(t, expectedJSON, actualJSON)
	})
}

func TestGetDistJSON(t *testing.T) {
	svr := Server{}
	nodeID := paramtable.GetNodeID()
	paramtable.SetNodeID(1)
	defer paramtable.SetNodeID(nodeID)

	t.Run("ReturnsCorrectJSON", func(t *testing.T) {
		req := &milvuspb.GetMetricsRequest{}
		ctx := context.Background()

		svr.meta = &meta{
			segments: &SegmentsInfo{
				segments: map[int64]*SegmentInfo{
					1: {
						SegmentInfo: &datapb.SegmentInfo{
							ID:            1,
							CollectionID:  1,
							PartitionID:   1,
							InsertChannel: "channel1",
							Level:         datapb.SegmentLevel_L1,
							State:         commonpb.SegmentState_Flushed,
						},
					},
				},
			},
		}

		segments := []*metricsinfo.Segment{
			{
				SegmentID:    1,
				State:        commonpb.SegmentState_Flushed.String(),
				CollectionID: 1,
				PartitionID:  1,
				Channel:      "channel1",
				Level:        datapb.SegmentLevel_L1.String(),
				NodeID:       1,
			},
		}
		dist := &metricsinfo.DataCoordDist{
			Segments: segments,
		}
		distBytes, err := json.Marshal(dist)
		assert.NoError(t, err)
		expectedJSON := string(distBytes)

		actualJSON := svr.getDistJSON(ctx, req)
		assert.Equal(t, expectedJSON, actualJSON)
	})

	t.Run("ReturnsEmptyJSONWhenNoDist", func(t *testing.T) {
		req := &milvuspb.GetMetricsRequest{}
		ctx := context.Background()

		svr.meta = &meta{segments: &SegmentsInfo{segments: map[int64]*SegmentInfo{}}}
		expectedJSON := "{}"
		actualJSON := svr.getDistJSON(ctx, req)
		assert.Equal(t, expectedJSON, actualJSON)
	})
}

func TestServer_getSegmentsJSON(t *testing.T) {
	segIndexes := typeutil.NewConcurrentMap[UniqueID, *typeutil.ConcurrentMap[UniqueID, *model.SegmentIndex]]()
	segIdx0 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
	segIdx0.Insert(10, &model.SegmentIndex{
		SegmentID:           1000,
		CollectionID:        1,
		PartitionID:         2,
		NumRows:             10250,
		IndexID:             10,
		BuildID:             10000,
		NodeID:              1,
		IndexVersion:        0,
		IndexState:          commonpb.IndexState_Finished,
		FailReason:          "",
		IsDeleted:           false,
		CreatedUTCTime:      12,
		IndexFileKeys:       nil,
		IndexSerializedSize: 0,
	})
	segIndexes.Insert(1000, segIdx0)
	s := &Server{
		meta: &meta{
			segments: &SegmentsInfo{
				segments: map[int64]*SegmentInfo{
					1: {
						SegmentInfo: &datapb.SegmentInfo{
							ID:            1,
							CollectionID:  1,
							PartitionID:   2,
							InsertChannel: "channel1",
						},
					},
				},
			},
			indexMeta: &indexMeta{
				segmentIndexes: segIndexes,
				indexes: map[UniqueID]map[UniqueID]*model.Index{
					1: {
						10: &model.Index{
							CollectionID: 1,
							FieldID:      100,
							IndexID:      10,
							IndexName:    "test_index",
							IsDeleted:    false,
						},
					},
				},
			},
		},
	}

	ctx := context.TODO()
	req := &milvuspb.GetMetricsRequest{}
	t.Run("valid request in dc", func(t *testing.T) {
		jsonReq := gjson.Parse(`{"in": "dc", "collection_id": 1}`)
		result, err := s.getSegmentsJSON(ctx, req, jsonReq)
		assert.NoError(t, err)
		assert.NotEmpty(t, result)
	})

	t.Run("invalid request", func(t *testing.T) {
		jsonReq := gjson.Parse(`{"in": "invalid"}`)
		result, err := s.getSegmentsJSON(ctx, req, jsonReq)
		assert.Error(t, err)
		assert.Empty(t, result)
	})

	t.Run("vaild request in dn", func(t *testing.T) {
		segments := []*metricsinfo.Segment{
			{
				SegmentID:    1,
				CollectionID: 100,
				PartitionID:  10,
				NumOfRows:    1000,
				State:        "Flushed",
			},
		}
		segmentsBytes, err := json.Marshal(segments)
		assert.NoError(t, err)
		expectedJSON := string(segmentsBytes)

		mockResp := &milvuspb.GetMetricsResponse{
			Status:   merr.Success(),
			Response: expectedJSON,
		}

		dn := mocks.NewMockDataNodeClient(t)
		dn.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(mockResp, nil)

		nodeManager := session.NewMockNodeManager(t)
		nodeManager.EXPECT().GetClient(mock.Anything).Return(dn, nil)
		nodeManager.EXPECT().GetClientIDs().Return([]int64{1})
		s.nodeManager = nodeManager

		jsonReq := gjson.Parse(`{"in": "dn"}`)
		result, err := s.getSegmentsJSON(ctx, req, jsonReq)
		assert.NoError(t, err)
		assert.NotEmpty(t, result)

		jsonReq = gjson.Parse(`{}`)
		result, err = s.getSegmentsJSON(ctx, req, jsonReq)
		assert.NoError(t, err)
		assert.NotEmpty(t, result)
	})
}
